package com.asiainfo.dacp.dp.executor.utils;

import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerRecord;

import com.etlmaster.executor.realtime.kafka.KafkaHelper;
import com.etlmaster.executor.utils.LogWriter;



public class zipFile {
	public static void main(String[] args) throws Exception{
		String format="yyyy-MM-dd HH:mm:ss";
		while (true){
		for (int i=0;i<100;i++){
			String date=new SimpleDateFormat(format).format(new Date());
		KafkaHelper.getProducer().send(new ProducerRecord<String, String>(
				"dp", i+"",i+date+"dp test message"));
		}
		Thread.sleep(6000);
		LogWriter.addLog("INFO","send message done");
		ConsumerRecords<String, String> records = KafkaHelper.getConsumer("dp").poll(1000);

		LogWriter.addLog("INFO","fetch :"+records.count());

		for (ConsumerRecord<String, String> record : records)
			LogWriter.addLog("INFO"," offset = {} , key = {}, value = {}",Long.toString(record.offset()),record.key(),record.value());

		Thread.sleep(6000);
		}
		
	
	}
}
